/*
 *  Openmysee
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */
				 
#include "echo.h"
int NumNewChannel;
struct Channel *ChannelHash[MAX_CHANNEL];
struct Channel *ChannelList;
extern char *PREFIX;
extern struct ServerDesc TRACKER[MAX_TYPE];
extern int Clientclosure (int listnum, int type);

inline int hash_str (unsigned char *str, int len)
{
	int hash;
	for (hash=0; len; len--, str++)
		hash += (hash << 5) - hash + (*str);
	return hash & (MAX_CHANNEL - 1);
}

struct Channel *getChannel (struct Channel **phash, char *name, int len)
{
	int id = hash_str (name, len);
	struct Channel *p;
	for (p=phash[id]; p; p=p->next)
	{
		if (strncmp (name, p->channel_md5, len) == 0)
			return p;
	}
	PDEBUG("Cannot findChannel hash %.32s.\n", name);
	return NULL;
}

void apply_hash (struct Channel **phash, void apply(struct Channel *, void *), void *p)
{
	int i;
	struct Channel *pc, *nextpc;
	for (i=0; i<MAX_CHANNEL; i++)
	{
		for (pc=phash[i]; pc; pc=nextpc)
		{
			nextpc = pc->next;
			apply (pc, p);
		}
	}
}

void apply_list (struct Channel *plist, void apply(struct Channel *, void *), void *p)
{
	struct Channel *pc, *nextpc;
	for (pc=plist; pc; pc=nextpc)
	{
		nextpc = pc->lnext;
		apply (pc, p);
	}
}

int freeChannel (struct Channel **phash, struct Channel **plist, int *count, struct Channel *p)
{
	int id = hash_str (p->channel_md5, MD5_LEN);
	struct Edge *nextedge, *pedge;
	struct Channel *pchannel;
	if (phash[id] == p)
	{
		phash[id] = p->next;
	} else
	{
		for (pchannel=phash[id]; pchannel; pchannel=pchannel->next)
		{
			if (pchannel->next == p)
			{
				pchannel->next = p->next;
				break;
			}
		}
		if (!pchannel) return -1;
	}
	if ((*plist) == p)
	{
		*plist = p->lnext;
	} else
	{
		for (pchannel=*plist; pchannel; pchannel=pchannel->lnext)
		{
			if (pchannel->lnext == p)
			{
				pchannel->lnext = p->lnext;
				break;
			}
		}
		if (!pchannel) return -1;
	}
	(*count) --;
#ifdef __CP_SOURCE
	if (p->db != NULL) fclose (p->db);
#endif
	if (p->pcinfo)
	{
		free_livechannel (p);
		free (p->pcinfo);
	}
	for (pedge=p->PeerHead; pedge; pedge=nextedge)
	{
		p->numclient --;
		nextedge = pedge->cnext;
		delEdge (pedge);
	}
	free (p);
	return 0;
}

inline struct Channel *findChannel (char *name, int len)
{
	return getChannel (ChannelHash, name, len);
}

void freeLiveChannel (struct Channel *pc, void *p)
{
	freeChannel (ChannelHash, &ChannelList, &NumNewChannel, pc);
}

void freeAllChannel ()
{
	apply_hash (ChannelHash, freeLiveChannel, NULL);
}

inline void buildLivePath (char *buf, int len, char *md5)
{
	snprintf (buf, len, "%s/%s%.2s/", PREFIX, LIVE_PREFIX, md5);
	mkdir (buf, 0777);
	strcat (buf, md5);
}


#ifdef __CP_SOURCE
int locate_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
{
	int i, *msg;
	struct LiveChannelInfo *c = pc->pcinfo;
	if (c == NULL)
		return -1;
	if (c->indisk == NULL)
	{
		c->max_queue = MAX_QUEUE;
		c->indisk = calloc (c->max_queue, 1);
		c->bitflag = calloc ((c->max_queue+7)/8, 1);
//		if (c->indisk == NULL || c->bitflag == NULL)
		PDEBUG ("allocate memory,%p,%p\n",c->indisk, c->bitflag);
		return -1;
	}
	i = id % c->max_queue;
	if (c->indisk[i] == 0 || (pc->type != T_PLIST && c->indisk[i] != (id/c->max_queue + 1)))
	{
		PINFO ("empty flag %d.\n", c->indisk[i]);
		return -1;
	}

	if (pc->maxblocksize + 2*sizeof(int) > max)
	{
		PDEBUG ("too small buffer %d for %d", max, pc->maxblocksize);
		return -2;
	}

	if (fseeko (pc->db, ((off_t)(i)) * (pc->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
	{
		PDEBUG ("fseek failed.\n");
		return -1;
	}
	if ((i=fread (buf, 1, pc->maxblocksize+2*sizeof (int), pc->db)) <= 2*sizeof (int) || i < ((int *)buf)[1]+2*sizeof(int))
	{
		PDEBUG ("Only %d read [%d]\n", i, ((int *)buf)[1]);
		return -1;
	}
	msg = (int *)buf;
	if (pc->type == T_PLIST)
		msg[0] = id;
	if (msg[0] != id || msg[1] > pc->maxblocksize || msg[1] <= 0)
	{
		PDEBUG ("Message read is [%d %d]\n", msg[0], msg[1]);
		return -1;
	}
	pc->upsize += msg[1];
	return msg[1];
}

int saveBlock (struct Channel *c, char *buf, struct Session *p)
{
	unsigned int pos, id, size;
	struct LiveChannelInfo *pcinfo;

	assert (buf);
	if ((!c) || (pcinfo = c->pcinfo) == NULL)
	{
		PDEBUG ("saveBlock c is null.\n");
		return -1;
	}
	if (pcinfo->indisk == NULL)
	{
		pcinfo->max_queue = MAX_QUEUE;
		pcinfo->indisk = calloc (pcinfo->max_queue, 1);
		pcinfo->bitflag = calloc ((pcinfo->max_queue+7)/8, 1);
		if (pcinfo->indisk == NULL || pcinfo->bitflag == NULL)
			return -1;
	}
	id = ((unsigned int *)buf)[0];
	size = ((unsigned int *)buf)[1];
	pos = id % pcinfo->max_queue;
	if (id > 0) clrBit (pcinfo->bitflag, pos);
	if (size > MAX_BLOCK_SIZE || size <= MIN_BLOCK_SIZE || id < 0)
	{
		PDEBUG ("saveBlock:size is %d and id is %d.\n", size, id);
		return 0;
	}

	if (c->maxblocksize == 0)
		c->maxblocksize = size;
	else if (size > c->maxblocksize)
	{
		PDEBUG ("saveBlock:maxblocksize is %d, size is %d and id is %d.\n", c->maxblocksize, size, id);
		return 0;
	}
//	PDEBUG ("Recv %d(%d) Save2 %d, dataSource %p and now is %p\n", (int)id, (int)size, (int)pos, pcinfo->dataSource, p);

	if (fseeko (c->db, ((off_t)(pos)) * (c->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
	{
		PDEBUG ("Error in fsseko.\n");
		return -1;
	}

	if (fwrite (buf, size+2*sizeof(int), 1, c->db) != 1)
	{
		PDEBUG ("fwrite error in saveBlock:%.32s:%s", c->channel_md5, c->fname);
		return -1;
	}

	if (pcinfo->dataSource != p)
	{
		PDEBUG ("dataSource %p is not equal p %p.\n", pcinfo->dataSource, p);
		pcinfo->dataSource = p;
	}

	fflush (c->db);
	pcinfo->total ++;
	c->downsize += size;
	if (id > pcinfo->maxID) pcinfo->maxID = id;
	pcinfo->indisk[pos] = (id/pcinfo->max_queue) + 1;
	if (pcinfo->indisk[pos] == 0)
		PDEBUG("Too large id %ds", id);
	return size;
}

int init_livechannel (struct Channel *p)
{
	if ((p->db = fopen (p->fname, "w+")) == (FILE *)0)
		return -1;
	p->pcinfo->isSave = 0;
	return 0;
}

int free_livechannel (struct Channel *p)
{
	struct LiveChannelInfo *pcinfo = p->pcinfo;
	free (pcinfo->indisk);
	free (pcinfo->bitflag);
	unlink (p->fname);
	if (pcinfo->dataSource != NULL)
	{
		pcinfo->dataSource->pc = NULL;
		PDEBUG ("close CS source for channel %.32s.\n", p->channel_md5);
		Clientclosure (pcinfo->dataSource-TRACKER[TYPE_P2PC].head, TYPE_P2PC);
		pcinfo->dataSource = NULL;
	}
	freeMedia (p);
	return 0;
}
#endif

#ifdef __SP_SOURCE
extern time_t CurrentTime;
extern char *NET_NAME[];
extern char *WWW_ROOT;
extern char *defaultspip;
int Changed = 1;
extern int timer_add (unsigned int t, TimerFunc process, void *entity, void *data);
extern int writeMessage (struct Session *p, struct Channel *pc, char *ptr);
extern int buildGTV (struct Channel *pc, int datalen, char *data, int type);
extern inline void freeProgram (struct Channel *, void *);
extern void send_all_spupdate (struct Channel *pc, struct SPUpdate *s);

void hup_handler (int sig)
{
	Changed ++;
}
inline void buildPListPath (char *buf, int len, char *md5)
{
	snprintf (buf, len, "%s/%s/", PREFIX, PLIST_PREFIX);
	strcat (buf, md5);
}

FILE * open_keyfile (struct Channel *p)
{
	char buffer[MAX_DATA];
	struct stat stbuf;
	if (p == NULL) return NULL;
	snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
	if (stat (buffer, &stbuf) == 0)
	{
		if (!S_ISREG (stbuf.st_mode))
		{
			PDEBUG ("File %s exist and not a regular file", buffer);
			return NULL;
		}
	}
	return fopen (buffer, "r");
}

int send_mplist_spupdate (struct Channel *pc, void * data)
{
	int iddiff, prev;
	char *buf, buffer[MAX_DATA];
	struct Edge *pedge;
	struct LiveChannelInfo *pcinfo;
	struct SPUpdate s;
	struct logrec lrec;
	time_t slot;

	if (pc == NULL || pc->pcinfo == NULL || pc->pcinfo->mlist == NULL
			|| pc->pcinfo->maxID == 0)
	{
		PDEBUG ("Wrong playlist data.\n");
		return -1;
	}
	pcinfo = pc->pcinfo;
	if (pcinfo->s.maxBlockID == 0)
	{
		fseek (pcinfo->keyfile, 0, SEEK_SET);
		if (fread (&lrec, 1, sizeof(struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
		{
			PDEBUG ("Cannot read keysample\n");
			return -1;
		}
		pcinfo->s.minBlockID = (((CurrentTime - FIX_MAGIC ) * 16 + pcinfo->maxID - 1) / pcinfo->maxID) * pcinfo->maxID;
		pcinfo->s.maxBlockID = lrec.id;
		if (pcinfo->s.maxKeySample == 0)
			pcinfo->s.maxKeySample = lrec.keysample;
		if (pcinfo->s.minKeySample == 0)
			pcinfo->s.minKeySample = CurrentTime;
//			pcinfo->s.minKeySample = lrec.keysample;
	}

	s = pcinfo->s;
	s.maxBlockID += s.minBlockID;
	s.maxKeySample = ((long long)CurrentTime) * 10000000;
	s.minKeySample = ((long long)(s.minKeySample)) * 10000000;

	if (pcinfo->updated < CurrentTime && pc->numclient > 0)
	{
		pcinfo->updated = CurrentTime;
		buf = buffer + sizeof (int);
		*(unsigned char *) buf = P2P_SPUPDATE;
		buf += sizeof (char);
		memcpy (buf, pc->channel_md5, MD5_LEN);
		buf += MD5_LEN;
		memcpy (buf, &s, sizeof (struct SPUpdate));
		buf += sizeof (struct SPUpdate);
		*(int *) buffer = buf - buffer;
		for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
		{
			if (writeMessage (pedge->me, pc, buffer) < 0)
			{
				PDEBUG ("send SPUPDATE err.\n");
			}
		}
		PINFO ("Send spupdate in %d, %d.\n", (int)(pcinfo->s.maxKeySample), (int)(pcinfo->s.minKeySample));
	}

	if (fread (&lrec, 1, sizeof (struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
	{
		fclose (pcinfo->keyfile);
		prev = pcinfo->mlist->m_cursampleid;
		pcinfo->mlist->m_cursampleid ++;
		if (pcinfo->mlist->m_cursampleid >= pcinfo->mlist->m_totalchannel)
			pcinfo->mlist->m_cursampleid = 0;
		if ((pcinfo->keyfile = open_keyfile (pcinfo->mlist->m_lists[pcinfo->mlist->m_cursampleid])) == NULL)
		{
			PDEBUG ("Error in new keysample file\n");
			return -1;
		}
		fseek (pcinfo->keyfile, SEEK_SET, 0);
		if (fread (&lrec, 1, sizeof (struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
		{
			PDEBUG ("Cannot read keysample\n");
			return -1;
		}
		iddiff = pcinfo->mlist->m_lists[prev]->pcinfo->maxID + lrec.id - pcinfo->mlist->m_lastmaxid;
		slot = lrec.keysample;
		pcinfo->s.maxKeySample = lrec.keysample;
	} else
	{
		if ((slot = lrec.keysample - pcinfo->s.maxKeySample) < 0)
			slot = 0;
		iddiff = lrec.id - pcinfo->mlist->m_lastmaxid;
		pcinfo->s.maxKeySample += slot;
	}
	pcinfo->mlist->m_lastmaxid = lrec.id;
	pcinfo->s.maxBlockID += iddiff;
	timer_add (CurrentTime+slot, (TimerFunc)send_mplist_spupdate, pc, NULL);
	return 0;
}

int init_mplistchannel (struct Channel *p)
{
	struct LiveChannelInfo *pcinfo = p->pcinfo;
	if (p->maxblocksize == 0) p->maxblocksize = DEFAULT_BLOCK;
	pcinfo->max_queue = BLOCK_PER_FILE;
	if ((pcinfo->keyfile = open_keyfile (pcinfo->mlist->m_lists[0])) == NULL)
	{
		PDEBUG ("Error in new keysample file\n");
		return -1;
	}
	pcinfo->total = 0;
	pcinfo->isSave = 0;
	timer_add (CurrentTime, (TimerFunc)send_mplist_spupdate, p, NULL);
	return 0;
}


struct Channel *newMPListChannel (char *name, char *cmd5, float bitrate, int maxblocksize, int nchannel, struct Channel **pchannel)
{
	int i, id, startID=0;
	struct Channel *p;
	struct LiveChannelInfo *pcinfo;
	if (NumNewChannel >= MAX_CHANNEL) return (struct Channel *)0;
	p = (struct Channel *)calloc (sizeof (struct Channel), 1);
	memcpy (p->channel_md5, cmd5, MD5_LEN);
	if (name) strncpy (p->channel_name, name, sizeof (p->channel_name));
	p->channel_md5[MD5_LEN] = 0;
	p->upsize = 0;
	p->downsize = 0;
	p->maxblocksize = maxblocksize;
	p->ctime = time (NULL);

	p->pcinfo = (struct LiveChannelInfo *)calloc (sizeof (struct LiveChannelInfo), 1);
	pcinfo = p->pcinfo;
	pcinfo->bitrate = bitrate;
	pcinfo->mlist = (struct MList *)calloc (sizeof (struct MList), 1);
	pcinfo->mlist->m_totalchannel = nchannel;
	pcinfo->media = calloc (nchannel, sizeof (struct MediaData));

	pcinfo->max_channel = nchannel;
	for (i=0; i<nchannel; i++)
	{
		pchannel[i]->ref ++;
		pcinfo->mlist->m_lists[i] = pchannel[i];
		pcinfo->mlist->m_startID[i] = startID;
		pcinfo->maxID += pchannel[i]->pcinfo->maxID;
		addMedia (p, startID, pchannel[i]->pcinfo->maxID, pchannel[i]->pcinfo->media[0].dlen, pchannel[i]->pcinfo->media[0].data, pchannel[i]->channel_name);
		startID += pchannel[i]->pcinfo->maxID;
	}
	if (init_mplistchannel (p) < 0)
	{
		PDEBUG ("newPlistChannel error for %p.", p);
		free_livechannel (p);
		free (pcinfo);
		free (p);
		return (struct Channel *)0;
	}

	id = hash_str (p->channel_md5, MD5_LEN);
	PDEBUG("newMPlistChannel hash %.32s(fname=%s) to %d.\n", p->channel_md5, p->fname, id);
	p->next = ChannelHash[id];
	ChannelHash[id] = p;
	p->lnext = ChannelList;
	ChannelList = p;
	NumNewChannel ++;
	return p;
}

struct Channel *add_mplist_channel (char *buffer, char *md5)
{
	int i=0;
	FILE *in;
	struct stat stbuf;
	struct Channel *pc=NULL, *pchannel[MAX_FILEINPUT];
	char *data=NULL, cname[MAX_DATA], buf[MAX_DATA];
	float bitrate=0.0;
	int bsize=16384, dlen=0;
	if (stat (buffer, &stbuf) < 0) return NULL;
	if (S_ISREG (stbuf.st_mode))
	{
		if ((in=fopen (buffer, "r")) == NULL)
		{
			PDEBUG ("Error in open file %s.\n", buffer);
			return NULL;
		}
		while (fgets (buf, MAX_DATA, in))
		{
			switch (i)
			{
				case 0:
					bitrate = atof (buf);
					i++;
					break;
				case 1:
					if (buf[strlen(buf)-1] == '\r' || buf[strlen(buf)-1] == '\n')
						buf[strlen(buf)-1] = 0;
					strncpy (cname, buf, sizeof(cname));
					i++;
					break;
				default:
					buf[MD5_LEN] = 0;
					if ((pc = getProgrambymd5 (buf, MD5_LEN)) != NULL)
					{
						pchannel[i-2] = pc;
						i++;
					}
					break;
			}
		}
		fclose (in);
		if (i <= 2) return NULL;
		dlen = pchannel[0]->pcinfo->media[0].dlen;
		data = pchannel[0]->pcinfo->media[0].data;
//		snprintf (chname, MAX_DATA, "%s/%s/%s.mediadata", WWW_ROOT, NET_NAME[0], pchannel[0]->channel_name);
//		data = read_file (chname, &dlen);
		if ((pc=newMPListChannel (cname, md5, bitrate, bsize, i-2, pchannel)) == NULL)
		{
			PDEBUG ("Error in newMPListChannel %s.\n", md5);
//			free (data);
			return NULL;
		}
		if (pc != NULL)
			strncpy (pc->fname, buffer, CHNLURL_LEN);
#ifdef TEST
		for (i=0; i<MAX_TS; i++) {
		if (dlen <= 0 || data == NULL || buildGTV (pc, dlen, data, i) < 0)
			continue;
		}
#endif
//		free (data);
	}
	return pc;
}

void apply_update (struct Channel *p, void *arg)
{
	struct stat stbuf;
//	char buffer[MAX_DATA];
	struct LiveChannelInfo *pc = p->pcinfo;
	if (pc && pc->mlist != NULL)
	{
		if (stat (p->fname, &stbuf) != 0)
			pc->status = 1;
	}
}


int check_newplist ()
{
	int i;
	struct Channel *pc;
	DIR *entry;
	struct dirent *pd;
	char buffer[MAX_DATA];
	char tmp0[MD5_LEN+1], tmp[MD5_LEN+1];
//	if (Changed == 0) return 0;
	snprintf (buffer, MAX_DATA, "%s/%s/", PREFIX, PLIST_PREFIX);
	if ((entry = opendir (buffer)) == NULL)
	{
		return -1;
	}
	while ((pd = readdir (entry)) != NULL)
	{
		if (strcmp (pd->d_name, ".") == 0 || strcmp (pd->d_name, "..") == 0 || strlen (pd->d_name) != MD5_LEN)
			continue;
		snprintf (buffer, MAX_DATA, "%s_%s", defaultspip, pd->d_name);
		md5_calc ((unsigned char *) tmp0,
				(unsigned char *) buffer, strlen (buffer));
		for (i = 0; i < MD5_LEN; i += 2)
				sprintf (tmp + i, "%02x", (unsigned char) tmp0[i / 2]);
		tmp[MD5_LEN] = 0;
		if ((pc = findChannel (tmp, strlen (tmp))) != NULL)
		{
			if (pc->pcinfo != NULL) pc->pcinfo->status = 0;
			continue;
		}
		snprintf (buffer, MAX_DATA, "%s/%s/%s", PREFIX, PLIST_PREFIX, pd->d_name);
		add_mplist_channel (buffer, tmp);
	}
	closedir (entry);
	apply_list (ChannelList, apply_update, NULL);
	Changed = 0;
	return 0;
}

int locate_mplist_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
{
	int result;
	int i, j;
	struct LiveChannelInfo *c = pc->pcinfo;
	if (c == NULL)
	{
		PDEBUG ("c is null.\n");
		return -1;
	}
	i = id % c->maxID;
	for (j=0; j<pc->pcinfo->mlist->m_totalchannel; j++)
		if (pc->pcinfo->mlist->m_startID[j] > i) break;
	j--;
	if (j < 0)
	{
		PDEBUG ("Internal error in locate id %d.\n", id);
		return -1;
	}
	if ((result = locateprog_by_id (pc->pcinfo->mlist->m_lists[j], i-pc->pcinfo->mlist->m_startID[j], buf, max)) > 0)
		((int *)buf)[0] = id;
	return result;
}

// Return 1 to indicate write available, return 0 to indicate now writable.
inline int newChannelFile (struct Channel *p)
{
	int result;
	struct stat stbuf;
	char buffer[MAX_LINE];
	struct LiveChannelInfo *pcinfo = p->pcinfo;
	if (pcinfo->numinput >= MAX_FILEINPUT)
	{
		PDEBUG ("Max file input has been reached, %s:%d.\n", p->fname,pcinfo->numinput);
		return -1;
	}
	snprintf (buffer, MAX_LINE, "%s/%d", p->fname, pcinfo->numinput);
	if (stat (buffer, &stbuf) == 0)
	{
		if (stbuf.st_size / (p->maxblocksize+2*sizeof (int)) < pcinfo->max_queue)
		{
			if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "a+")) == NULL)
			{
				PDEBUG ("Cannot open file %s.\n", buffer);
				return -1;
			}
			result = 1;
		} else
		{
			if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "r+")) == NULL)
			{
				PDEBUG ("Cannot open file %s.\n", buffer);
				return -1;
			}
			result = 0;
		}
		pcinfo->numblocks = stbuf.st_size /(p->maxblocksize+2*sizeof (int));
		pcinfo->maxID += stbuf.st_size / (p->maxblocksize+2*sizeof(int));
		if (stbuf.st_size % (p->maxblocksize + 2*sizeof(int))) return -1;
	} else
	{
		if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "w+")) == NULL)
		{
			PDEBUG ("Cannot open file %s.\n", buffer);
			return -1;
		}
		pcinfo->numblocks = 0;
		result = 1;
	}
	pcinfo->numinput ++;
	return result;
}

int init_livechannel (struct Channel *p)
{
	int i;
	struct stat stbuf;
	char buffer[MAX_LINE];
	struct LiveChannelInfo *pcinfo = p->pcinfo;
	if (stat (p->fname, &stbuf) == 0 && (!S_ISDIR (stbuf.st_mode)))
	{
		PDEBUG ("directory %s not exist or not a dir.\n", p->fname);
		return -1;
	}
	mkdir (p->fname, 0777);
	if (p->maxblocksize == 0) p->maxblocksize = DEFAULT_BLOCK;
	pcinfo->max_queue = BLOCK_PER_FILE;
	while ((i = newChannelFile (p)) == 0);
	if (i < 0) return -1;
	p->db = pcinfo->input[pcinfo->numinput-1];
	snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
	if (stat (buffer, &stbuf) == 0)
	{
		if (!S_ISREG (stbuf.st_mode))
		{
			PDEBUG ("File %s exist and not a regular file", buffer);
			return -1;
		}
	}
	pcinfo->keyfile = fopen (buffer, "a+");
	if (pcinfo->keyfile == NULL)
	{
		PDEBUG ("File %s can not be opened.\n", buffer);
		return -1;
	}
	pcinfo->total = 0;
	return 0;
}


int locate_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
{
	int i, pos, *msg;
	struct LiveChannelInfo *c = pc->pcinfo;
	if (c == NULL || id > c->maxID)
	{
		PDEBUG ("c is %p and id is (%d,%d).\n", c, c->maxID, id);
		return -1;
	}
	if (pc->maxblocksize + 2*sizeof(int) > max)
	{
		PDEBUG ("too small buffer %d for %d", max, pc->maxblocksize);
		return -2;
	}
	i = (id - c->startid) % c->max_queue;
	pos = (id - c->startid) / c->max_queue;
	if (pos >= c->numinput || c->input[pos] == NULL)
	{
		PDEBUG ("file %d does not exist. (%d,%p)\n", pos, c->numinput, c->input[pos]);
		return -1;
	}

	if (fseeko (c->input[pos], ((off_t)(i)) * (pc->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
	{
		PDEBUG ("Fssek failed. (%d, %d, %d)\n", pos, i, pc->maxblocksize);
		return -1;
	}
	if ((i=fread (buf, 1, pc->maxblocksize+2*sizeof (int), c->input[pos])) <= 2*sizeof (int) || i < ((int *)buf)[1]+2*sizeof(int))
	{
		PDEBUG ("Fread failed. (%d, %d, %d)\n", pos, i, pc->maxblocksize);
		return -1;
	}
	msg = (int *)buf;
	if (msg[1] > pc->maxblocksize || msg[1] <= 0)
	{
		PDEBUG ("msg format error. (%d, %d, %d)\n", msg[0], msg[1], pc->maxblocksize);
		return -1;
	}
	PINFO ("Found block. (%d, %d, %d, %d)\n", id, msg[0], msg[1], pc->maxblocksize);
//	msg[0] = id;
	pc->upsize += msg[1];
	return msg[1];
}

int saveBlock (struct Channel *c, char *buf, struct Session *p)
{
	struct logrec lrec;
	int id, size;
	int j=0;
	unsigned long long keysample;
	struct SPUpdate *s;
	struct LiveChannelInfo *pcinfo;

	if ((!c) || (pcinfo = c->pcinfo) == NULL)
	{
		PDEBUG ("saveBlock c is null.\n");
		return -1;
	}
	assert (buf);
	size = ((int *)buf)[1];
	if (size > MAX_BLOCK_SIZE || size <= MIN_BLOCK_SIZE)
	{
		PDEBUG ("saveBlock size is %d and id is %d.\n", size, id);
		return 0;
	}

	if (c->maxblocksize == 0)
	{
		c->maxblocksize = size;
		pcinfo->max_queue = BLOCK_PER_FILE;
	}
	else if (size > c->maxblocksize)
	{
		PDEBUG ("saveBlock maxblocksize is %d, size is %d and id is %d.\n", c->maxblocksize, size, id);
		return 0;
	}
	if (pcinfo->max_queue == 0)
	{
		PDEBUG ("c->max_queue is 0\n");
		return -1;
	}
//	PDEBUG ("Recv %d(%d) Save2 (%d,%d,%d), dataSource %p and now is %p\n", ((int*)buf)[0], (int)size, pcinfo->numinput, pcinfo->numblocks, pcinfo->maxID, pcinfo->dataSource, p);

	id = pcinfo->maxID;
	((int *)buf)[0] = id;
	if (fseeko (c->db, ((off_t)(pcinfo->numblocks)) * (c->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
	{
		PDEBUG ("Error in fsseko.\n");
		return -1;
	}

	if (fwrite (buf, c->maxblocksize+2*sizeof(int), 1, c->db) != 1)
	{
		PDEBUG ("fwrite error in saveBlock:%.32s:%s", c->channel_md5, c->fname);
		return -1;
	}

	j = ((int *)buf)[2];
	if (j <= 0 || j >= size-sizeof(keysample)) keysample = 0;
	else keysample = *(unsigned long long *) (buf+2*sizeof(int)+j);
	s = &(pcinfo->s);
	if (keysample > 0)
	{
		if(s->maxKeySample < keysample)
		{
			//the increasement should not be larger than 1000 seconds!
			if(s->maxKeySample == 0 || (keysample-s->maxKeySample)/10000000 < 1000)
			{
				s->maxKeySample = keysample;
			} else
				PDEBUG("Error Keysample at block %d,(%lld,%lld). \n", id, keysample, s->maxKeySample);
		}
		if (s->minKeySample == 0 || keysample < s->minKeySample)
		{
			s->minKeySample = keysample;
		}
		lrec.id = id;
		lrec.keysample = keysample/10000000;
		fwrite (&lrec, sizeof (lrec), 1, pcinfo->keyfile);
		fflush (pcinfo->keyfile);
	}
	if (id < s->minBlockID || s->maxBlockID == 0)
	{
		s->minBlockID = id;
	}
	if (id > s->maxBlockID)
	{
		s->maxBlockID = id;
	}

	if (pcinfo->dataSource != p)
	{
		PDEBUG ("dataSource %p is not equal p %p.\n", pcinfo->dataSource, p);
		pcinfo->dataSource = p;
	}


	fflush (c->db);
	pcinfo->total ++;
	pcinfo->maxID ++;
	pcinfo->numblocks ++;
	c->downsize += size;
	if (pcinfo->numblocks >= pcinfo->max_queue)
	{
		while ((j = newChannelFile (c)) == 0);
		if (j < 0) return -1;
		c->db = pcinfo->input[pcinfo->numinput-1];
	}
	return size;
}

int free_livechannel (struct Channel *p)
{
	char buffer[MAX_LINE];
	struct LiveChannelInfo *pcinfo = p->pcinfo;
	int i;
	send_all_spupdate (p, NULL);
	for (i=0; i<pcinfo->numinput; i++)
	{
		if (pcinfo->input[i]) fclose (pcinfo->input[i]);
		if (!pcinfo->isSave)
		{
			snprintf (buffer, MAX_LINE, "%s/%d", p->fname, i);
			unlink (buffer);
		}
	}
	p->db = NULL;
	if (pcinfo->keyfile != NULL) fclose (pcinfo->keyfile);
	if (!pcinfo->isSave)
	{
		snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
		unlink (buffer);
		rmdir (p->fname);
	}
	if (pcinfo->dataSource != NULL)
	{
		pcinfo->dataSource->pc = NULL;
		Clientclosure (pcinfo->dataSource - TRACKER[TYPE_CS].head, TYPE_CS);
		pcinfo->dataSource = NULL;
	}
	if (pcinfo->mlist)
	{
		timer_remove (p, NULL);
		for (i=0; i<pcinfo->max_channel; i++)
		{
			if (pcinfo->mlist->m_lists[i] != NULL)
			{
				pcinfo->mlist->m_lists[i]->ref --;
				if (pcinfo->mlist->m_lists[i]->ref <= 0)
					freeProgram (pcinfo->mlist->m_lists[i], NULL);
			}
		}
		free (pcinfo->mlist);
	}
	freeMedia (p);
#ifdef TEST
	for (i=0; i<MAX_TS; i++)
	{
		sprintf (buffer, "%s/%s/%s.gtv", WWW_ROOT, NET_NAME[i], p->channel_name);
		remove (buffer);
	}
#endif
	return 0;
}
#endif

struct Channel *newLiveChannel (char *name, struct Session *source, char *cmd5, float bitrate, int maxblocksize)
{
	int id;
	struct Channel *p;
	struct LiveChannelInfo *pcinfo;
	if (NumNewChannel >= MAX_CHANNEL) return (struct Channel *)0;
	p = (struct Channel *)calloc (sizeof (struct Channel), 1);
	memcpy (p->channel_md5, cmd5, MD5_LEN);
	if (name) strncpy (p->channel_name, name, sizeof (p->channel_name));
	p->channel_md5[MD5_LEN] = 0;
	p->upsize = 0;
	p->downsize = 0;
	p->maxblocksize = maxblocksize;
	p->ctime = time (NULL);

	p->pcinfo = (struct LiveChannelInfo *)calloc (sizeof (struct LiveChannelInfo), 1);
	pcinfo = p->pcinfo;
	pcinfo->dataSource = source;
	pcinfo->bitrate = bitrate;

	buildLivePath (p->fname, CHNLURL_LEN, cmd5);
	if (init_livechannel (p) < 0)
	{
		PDEBUG ("newLiveChannel error for %p.", p);
		free_livechannel (p);
		free (pcinfo);
		free (p);
		return (struct Channel *)0;
	}

	id = hash_str (p->channel_md5, MD5_LEN);
	PDEBUG("newLiveChannel hash %.32s(fname=%s) to %d.\n", p->channel_md5, p->fname, id);
	p->next = ChannelHash[id];
	ChannelHash[id] = p;
	p->lnext = ChannelList;
	ChannelList = p;
	NumNewChannel ++;
	return p;
}
